import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';
import '../utils.dart';

Stream<int> _getStream() {
  final controller = StreamController<int>();

  Timer(const Duration(milliseconds: 10), () => controller.add(1));
  Timer(const Duration(milliseconds: 20), () => controller.add(2));
  Timer(const Duration(milliseconds: 30), () => controller.add(3));
  Timer(const Duration(milliseconds: 40), () {
    controller.add(4);
    controller.close();
  });

  return controller.stream;
}

Stream<int> _getOtherStream(int value) {
  final controller = StreamController<int>();

  Timer(const Duration(milliseconds: 15), () => controller.add(value + 1));
  Timer(const Duration(milliseconds: 25), () => controller.add(value + 2));
  Timer(const Duration(milliseconds: 35), () => controller.add(value + 3));
  Timer(const Duration(milliseconds: 45), () {
    controller.add(value + 4);
    controller.close();
  });

  return controller.stream;
}

Stream<int> range() =>
    Stream.fromIterable(const [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]);

class SwitchTestPage extends TestPage {
  SwitchTestPage(super.title) {
    test('Rx.switchIfEmpty.whenEmpty', () async {
      expect(
        Stream<int>.empty().switchIfEmpty(Stream.value(1)),
      );
    });

    test('Rx.initial.completes', () async {
      expect(
        Stream.value(99).switchIfEmpty(Stream.value(1)),
      );
    });

    test('Rx.switchIfEmpty.reusable', () async {
      final transformer = SwitchIfEmptyStreamTransformer<bool>(
          Stream.value(true).asBroadcastStream());

      Stream<bool>.empty().transform(transformer).listen(((result) {
        expect(result);
      }));

      Stream<bool>.empty().transform(transformer).listen(((result) {
        expect(result);
      }));
    });

    test('Rx.switchIfEmpty.whenNotEmpty', () async {
      Stream.value(false)
          .switchIfEmpty(Stream.value(true))
          .listen(((result) {
        expect(result);
      }));
    });

    test('Rx.switchIfEmpty.asBroadcastStream', () async {
      final stream = Stream<int>.empty().switchIfEmpty(Stream.value(1)).asBroadcastStream();

      stream.listen(null);
      stream.listen(null);

      expect(stream.isBroadcast);
    });

    test('Rx.switchIfEmpty.error.shouldThrowA', () async {
      final streamWithError =
      Stream<int>.error(Exception()).switchIfEmpty(Stream.value(1));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.switchIfEmpty.pause.resume', () async {
      late StreamSubscription<int> subscription;
      final stream = Stream<int>.empty().switchIfEmpty(Stream.value(1));

      subscription = stream.listen(((value) {
        expect(value);

        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.switchIfEmpty accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.switchIfEmpty(Stream<int>.empty());

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.switchIfEmpty.nullable', () {
      nullableTest<String?>(
            (s) => s.switchIfEmpty(Stream.value('String')),
      );
    });

    test('Rx.switchMap', () async {
      const expectedOutput = [5, 6, 7, 8];
      var count = 0;

      _getStream().switchMap(_getOtherStream).listen(((result) {
        expect(result);
      }));
    });

    test('Rx.switchMap.reusable', () async {
      final transformer = SwitchMapStreamTransformer<int, int>(_getOtherStream);
      const expectedOutput = [5, 6, 7, 8];
      var countA = 0, countB = 0;

      _getStream().transform(transformer).listen(((result) {
        expect(result);
      }));

      _getStream().transform(transformer).listen(((result) {
        expect(result);
      }));
    });

    test('Rx.switchMap.asBroadcastStream', () async {
      final stream = _getStream().asBroadcastStream().switchMap(_getOtherStream);

      stream.listen(null);
      stream.listen(null);

      expect(true);
    });

    test('Rx.switchMap.error.shouldThrowA', () async {
      final streamWithError =
      Stream<int>.error(Exception()).switchMap(_getOtherStream);

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.switchMap.error.shouldThrowB', () async {
      final streamWithError = Stream.value(1).switchMap(
              (_) => Stream<void>.error(Exception('Catch me if you can!')));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.switchMap.error.shouldThrowC', () async {
      final streamWithError = Stream.value(1).switchMap<void>((_) {
        throw Exception('oh noes!');
      });

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.switchMap.pause.resume', () async {
      late StreamSubscription<int> subscription;
      final stream = Stream.value(0).switchMap((_) => Stream.value(1));

      subscription = stream.listen(((value) {
        expect(value);

        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.switchMap stream close after switch', () async {
      final controller = StreamController<int>();
      final list = controller.stream
          .switchMap((it) => Stream.fromIterable([it, it]))
          .toList();

      controller.add(1);
      await Future<void>.delayed(Duration(microseconds: 1));
      controller.add(2);

      await controller.close();
      expect(await list);
    });

    test('Rx.switchMap accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.switchMap((_) => Stream<int>.empty());

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.switchMap closes after the last inner Stream closed - issue/511',
            () async {
          final outer = StreamController<bool>();
          final inner = BehaviorSubject.seeded(false);
          final stream = outer.stream.switchMap((_) => inner.stream);

          expect(stream);

          outer.add(true);
        });

    test('Rx.switchMap every subscription triggers a listen on the root Stream',
            () async {
          var count = 0;
          final controller = StreamController<bool>.broadcast();
          final root =
          OnSubscriptionTriggerableStream(controller.stream, () => count++);
          final stream = root.switchMap((event) => Stream.value(event));

          stream.listen((event) {});
          stream.listen((event) {});

          expect(count);

          await controller.close();
        });

    test('Rx.switchMap.nullable', () {
      nullableTest<String?>(
            (s) => s.switchMap((v) => Stream.value(v)),
      );
    });

  }

}

class OnSubscriptionTriggerableStream<T> extends Stream<T> {
  final Stream<T> inner;
  final void Function() onSubscribe;

  OnSubscriptionTriggerableStream(this.inner, this.onSubscribe);

  @override
  bool get isBroadcast => inner.isBroadcast;

  @override
  StreamSubscription<T> listen(void Function(T event)? onData,
      {Function? onError, void Function()? onDone, bool? cancelOnError}) {
    onSubscribe();
    return inner.listen(onData,
        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
  }
}